掌握 Kafka 基础知识,轻松处理海量数据! 您所在的位置:网站首页 kafka send callback 掌握 Kafka 基础知识,轻松处理海量数据!

掌握 Kafka 基础知识,轻松处理海量数据!

2023-04-15 17:19| 来源: 网络整理| 查看: 265

一、什么是Kafka1.1 Kafka的定义

Kafka是一种分布式的消息系统,用于实现高可靠性、高吞吐量、低延迟的数据传输。可以把Kafka想象成一个邮局,生产者(相当于寄信人)把消息(信件)发给Kafka,消费者(相当于收信人)从Kafka中获取消息(信件)。这个过程可以实现多生产者、多消费者、多主题的消息传递。

例如,有一个电商网站,用户在线下单后,需要将订单信息通知到多个模块,如库存模块、物流模块、财务模块等。如果通过直接调用这些模块的接口进行订单通知,则每个接口都需要等待通知完成后才能继续执行,严重影响处理效率。而使用Kafka,则可以将订单信息发送到Kafka中,各个模块作为消费者从Kafka中获取订单信息,不会因为等待而阻塞。这样可以提高整个系统的处理能力和响应速度。

1.2 Kafka的特点

Kafka具有以下特点:

高性能:Kafka能够提供每秒数百万级别的消息传输,可适应高吞吐量的数据处理场景。可扩展性:Kafka支持水平扩展,用户可以通过增加broker节点来提高Kafka的吞吐量和容错性能。可靠性:Kafka提供多副本备份机制,当某些节点故障时,可以自动进行副本切换,确保消息不会丢失。持久化:Kafka采用磁盘存储,可以长期保存消息,也可根据需要设置消息的保留时间或删除策略。灵活性和可定制性:Kafka提供各种配置选项,可以根据需要进行灵活定制。大数据生态系统集成:Kafka可以很好地集成到Hadoop、Spark、Storm等大数据处理系统中,提供数据源或目标的功能。

总之,Kafka是一种强大的、高效的、可扩展的、可靠的分布式消息系统,具有广泛的应用场景。

二、Kafka的架构2.1 Kafka的组成部分

Kafka的主要组成部分包括生产者(producer)、消费者(consumer)、Broker、Topic、Partition和Offset等。

生产者:用于产生消息并发送到Kafka的指定Topic,将消息写入Topic的一个或多个Partition中。消费者:用于从Kafka的指定Topic消费消息,并且可以控制对消息的读取速度。Broker:Kafka集群中的一个节点,存储了Topic的分区数据副本。Topic:是消息的逻辑分类,一个Topic可以被划分为多个Partition,每个Partition存储了一部分数据。Partition:是物理存储单元,每个Partition都保存了一个有序的消息序列。Offset:是消息在Partition中的偏移量,消费者可以通过指定Offset来读取指定位置的消息。2.2 Kafka的工作流程

afka的工作流程分为两部分:发布和订阅。

发布:生产者将消息发送到Kafka的指定Topic,Kafka根据Partition数选择合适的Partition存储消息。如果指定了Key,则消息会基于Key进行Hash,并且被写入到特定Partition中;如果没有指定Key,则消息会随机分布到所有Partition中。订阅:消费者从Kafka的指定Topic订阅消息,Kafka会将消息按照Partition顺序进行读取,并且确保每个Partition内的消息顺序不变。消费者可以在每个Partition内指定一个Offset来读取消息,也可以从最新的消息开始读取。三、Kafka的使用场景3.1 Kafka的典型应用场景

Kafka具有以下典型应用场景:

日志收集:Kafka可用于日志文件的收集和处理,生产者将日志写入Kafka的Topic中,而消费者可以消费这些日志并将它们存储到Hadoop、Elasticsearch等系统中。消息队列:Kafka可作为异步消息传递的消息队列,生产者可以将消息放入Kafka的Topic中,而消费者则可以从该Topic中获取消息进行处理。流处理:Kafka可用于实时数据流处理,例如实时预测、实时监控、实时计算等。事件溯源:Kafka可用于事件溯源,将事件记录到Kafka Topic中,并创建事件流以跟踪数据的变化历史。3.2 Kafka的优缺点3.2.1 优点高性能:Kafka具有高吞吐量和低延迟,能够处理大量数据并保证高效的消息传输。可扩展性:Kafka可水平扩展,支持增加Broker节点来提高吞吐量和容错性能。可靠性:Kafka支持多副本备份机制,可以避免故障导致的数据丢失和服务不可用。持久化:Kafka采用磁盘存储,可以长期保存消息,也可根据需要设置消息的保留时间或删除策略。灵活性和可定制性:Kafka提供各种配置选项,可以根据需要进行灵活定制。大数据生态系统集成:Kafka可以很好地集成到Hadoop、Spark、Storm等大数据处理系统中,提供数据源或目标的功能。3.2.2 缺点部署和维护成本高:Kafka需要部署在一个分布式环境中,对于初学者来说,安装、配置和维护是一个挑战。系统复杂度高:Kafka的系统设计和架构比较复杂,需要理解其概念和原理才能正确使用。数据一致性问题:由于存在多个副本,可能会存在数据一致性的问题,需要进行合理的配置和管理。四、Kafka的安装与配置4.1 Kafka的下载与准备Windows平台访问Kafka官网:https://kafka.apache.org/downloads选择合适的版本进行下载解压文件到指定目录,如D:/kafka/下载并安装Java运行环境Linux/Mac平台在终端中执行以下命令下载Kafka最新版本:Codewget https://downloads.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz解压文件到指定目录,如/opt/kafka/:Codetar -xzf kafka_2.13-3.0.0.tgz mv kafka_2.13-3.0.0 /opt/kafka安装Java运行环境4.2 Kafka的安装与启动Windows平台打开cmd窗口,进入Kafka解压目录的bin文件夹执行以下命令启动Kafka服务器: Code.\windows\bin\windows\kafka-server-start.bat .\config\server.propertiesLinux/Mac平台启动Zookeeper服务Codecd /opt/kafka bin/zookeeper-server-start.sh config/zookeeper.properties打开新的终端窗口,进入Kafka解压目录的bin文件夹执行以下命令启动Kafka服务器: Codecd /opt/kafka bin/kafka-server-start.sh config/server.properties4.3 Kafka的配置文件解析

Kafka的配置文件主要包含以下参数:

broker.id:Broker节点的唯一标识。listeners:监听器列表,用于指定Broker节点所提供的服务和端口号。log.dirs:日志存储路径,用于存放Kafka的日志数据。zookeeper.connect:Zookeeper服务器的地址和端口号。advertised.listeners:外部访问Broker节点的地址和端口号,用于跨机器的通信。

示例:

Codebroker.id=0 listeners=PLAINTEXT://localhost:9092 log.dirs=/tmp/kafka-logs zookeeper.connect=localhost:2181 advertised.listeners=PLAINTEXT://your_hostname:9092五、Kafka的基本操作

Kafka是一种高性能、分布式的消息中间件。它能够处理大量的数据流,并将其发送给订阅者。以下是Kafka的基本操作步骤:

安装和启动Kafka

首先,需要从Kafka官网下载Kafka二进制文件,然后解压到指定目录。使用命令行进入Kafka目录,输入以下命令启动Kafka:

Codebin/kafka-server-start.sh config/server.properties

2.创建主题

在Kafka中,消息是通过主题来进行分类的。所以在开始发送和接收消息之前,需要先创建主题。下面是创建主题的示例命令:

Codebin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my_topic

其中,--replication-factor指定主题的复制因子,--partitions指定主题的分区数。

3.发送消息

使用Kafka提供的命令行工具kafka-console-producer.sh来发送消息。示例命令如下:

Codebin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

在该命令中,--broker-list指定Kafka服务器的地址,--topic指定消息的主题。在控制台输入消息内容,按Enter键即可发送消息。

4.接收消息

使用Kafka提供的命令行工具kafka-console-consumer.sh来接收消息。示例命令如下:

Codebin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning

在该命令中,--bootstrap-server指定Kafka服务器的地址,--topic指定要接收的消息主题。--from-beginning表示从最初开始接收消息。

5.删除主题

使用命令行工具kafka-topics.sh来删除主题。示例命令如下:

Codebin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic my_topic

在该命令中,--delete指定要删除主题,--zookeeper指定Zookeeper的地址和端口。

6.列出所有主题

使用命令行工具kafka-topics.sh来列出所有主题。示例命令如下:

Codebin/kafka-topics.sh --list --zookeeper localhost:2181六、Kafka的高级操作6.1 消息的生产与消费保障

在生产和消费消息时,可以通过指定acks参数和retry机制来保证消息的可靠性。

acks:用于指定需要等待多少个副本成功写入数据后,生产者才会认为消息发送成功。retries:用于指定生产者在发送消息时失败后的重试次数。

以下是Java代码示例: CodeProperties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 3);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");​Producer producer = new KafkaProducer(props);​String topic = "test";String key = "key1";String value = "hello world";​ProducerRecord record = new ProducerRecord(topic, key, value);​try { RecordMetadata metadata = producer.send(record).get(); System.out.printf("offset = %d, partition = %d%n", metadata.offset(), metadata.partition());} catch (InterruptedException e) { e.printStackTrace();} catch (ExecutionException e) { e.printStackTrace();}​producer.close();

6.2 消息的持久化

Kafka的消息是持久化到磁盘上的,可以设置消息的过期时间来自动清理过期消息。在配置文件server.properties中,可以设置以下参数:

log.retention.hours:消息保存的时间,默认为168小时(7天)。log.segment.bytes:每个日志段的大小,默认为1GB。log.cleanup.policy:指定日志清理策略。

以下是Java代码示例:

CodeProperties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); ​ Producer producer = new KafkaProducer(props); ​ String topic = "test"; String key = "key1"; String value = "hello world"; ​ ProducerRecord record = new ProducerRecord(topic, key, value); ​ producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.printf("offset = %d, partition = %d%n", metadata.offset(), metadata.partition()); } else { exception.printStackTrace(); } } }); ​ producer.close();6.3 Kafka的集群模式

Kafka支持分布式的集群模式,可以通过添加Broker节点来提高集群的吞吐量和容错性能。在配置文件server.properties中,可以设置以下参数:

broker.id:每个Kafka节点都需要唯一的Broker ID,这个ID需要在集群中是唯一的。listeners:Kafka的监听器,用于处理客户端和Broker之间的网络通信。默认为PLAINTEXT协议,也可以配置其他安全协议如SSL。log.dirs:Kafka的日志目录,用于存储Topic的消息数据。zookeeper.connect:Zookeeper的连接参数,用于集群管理。

以下是Java代码示例:

CodeProperties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); ​ Producer producer = new KafkaProducer(props); ​ String topic = "test"; String key = "key1"; String value = "hello world"; ​ ProducerRecord record = new ProducerRecord(topic, key, value); ​ producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.printf("offset = %d, partition = %d%n", metadata.offset(), metadata.partition()); } else { exception.printStackTrace(); } } }); ​ producer.close();6.4 数据迁移及主题管理

Kafka支持对现有的Topic进行数据迁移和拆分,以及删除和创建Topic。在Kafka集群中,可以使用命令行工具kafka-topics来执行这些操作。

以下是Java代码示例:

CodeProperties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); ​ AdminClient client = AdminClient.create(props); ​ String topic = "test"; ​ NewTopic newTopic = new NewTopic(topic, 3, (short)2); client.createTopics(Collections.singleton(newTopic)); ​ DescribeTopicsResult result = client.describeTopics(Collections.singleton(topic)); Map topicDescriptions = result.all().get(); ​ for (TopicDescription topicDescription : topicDescriptions.values()) { System.out.println("Topic: " + topicDescription.name() + ", Partitions: " + topicDescription.partitions().size()); } ​ client.close();6.5 Broker间数据同步

Kafka支持集群环境下的Broker间数据同步。当一个Broker接收到消息后,它会将消息保存到本地磁盘上,并将消息的副本发送给其他Broker。这样,即使某个Broker出现故障,其他Broker仍然可以提供数据服务。

Kafka中的数据同步是通过Zookeeper来协调的。当Broker加入或退出Kafka集群时,Zookeeper会负责通知集群中的其他Broker,从而实现数据同步。

为了确保数据同步效果,Kafka在默认情况下会将消息的副本发送到多个Broker上。这里的复制因子可以通过配置文件来修改,默认值为1。

6.6 消息过期时间

在实际应用中,我们可能需要对消息进行过期处理,以避免消息堆积导致系统资源浪费。Kafka提供了消息过期时间的功能,在消息发送时可以指定消息过期时间,Kafka会自动删除已经过期的消息。

Kafka中消息过期时间是通过在Producer端设置消息的timestamp属性实现的。具体来说,可以通过以下两种方式设置消息过期时间:

在Producer端设置消息过期时间javaCopy Codelong currentTime = System.currentTimeMillis(); // 获取当前时间long expireTime = currentTime + expirationTime; // 计算消息过期时间ProducerRecord record = new ProducerRecord("my_topic", messageKey, messageValue);record.timestamp(expireTime); // 设置消息的过期时间producer.send(record);在Kafka Broker端设置全局的消息过期时间在Kafka配置文件中添加如下配置:Copy Codelog.retention.hours=24上述配置表示Kafka Broker会自动删除24小时之前的消息。6.7 消费者组

在实际应用中,我们通常需要将同一组消费者同时消费同一个主题下的消息。这时就需要使用消费者组来协调消息的消费情况。

Kafka消费者组是由多个消费者实例组成的,每个消费者实例都负责消费主题下的一部分消息。当一个新消费者加入消费者组时,它会接收到尚未被消费的消息。当某个消费者出现故障时,Kafka会自动将该消费者负责消费的消息分配给其他消费者。

Kafka中的消费者组是通过使用相同的group.id来关联一组消费者实现的。例如,以下代码创建了一个名为my_consumer_group的消费者组:

CodeProperties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my_consumer_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); ​ KafkaConsumer consumer = new KafkaConsumer(props);6.8 消息过滤

在实际应用中,我们可能需要对消息进行过滤,只选择部分消息进行消费。Kafka提供了多种方式来实现消息过滤,包括按主题、按消息头、按消息内容等。

以下是按主题过滤消息的示例代码:

CodeProperties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my_consumer_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); ​ KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("my_topic"));

上述代码中,使用subscribe方法订阅名为my_topic的主题,该主题下的所有消息都会被消费。

6.9 自定义分区器

在Kafka中,每个主题可以由多个分区组成。当Producer发送消息时,需要指定消息所属的分区。Kafka默认提供了一些分区策略,但是在实际应用中,我们可能需要自定义分区器来满足特定的业务需求。

自定义分区器需要实现org.apache.kafka.clients.producer.Partitioner接口。以下是一个自定义的分区器示例:

Codepublic class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int partition = 0; if (key == null) { partition = ThreadLocalRandom.current().nextInt(numPartitions); } else { partition = Math.abs(key.hashCode() % numPartitions); } return partition; } ​ @Override public void close() { // do nothing } ​ @Override public void configure(Map configs) { // do nothing } }

上述代码中,MyPartitioner类实现了partition方法,根据消息的key值计算出要发送到哪个分区。如果消息的key值为null,那么就随机选择一个分区。该分区器还可以通过configure方法进行配置,这里并没有进行任何配置。

七、Kafka的应用实践7.1 实现高可用性、高并发度的日志系统

Kafka可以作为高可用性和高并发度的日志系统,采用以下方案:

使用多个Broker组成Kafka集群,提供高并发度和高可用性。使用Zookeeper来进行集群管理和节点选举,防止单点故障。使用Producer将日志信息写入Kafka的Topic中,消费者可以从Topic中读取消息进行处理。使用Flume或Logstash等工具来实现日志的采集和预处理。

示例:

Java代码示例:使用Kafka Java客户端向名为“log”的主题发送一条日志消息;

CodeProperties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); ​ Producer producer = new KafkaProducer(props); ​ String topic = "log"; String key = "key1"; String value = "hello world"; ​ ProducerRecord record = new ProducerRecord(topic, key, value); ​ producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.printf("offset = %d, partition = %d%n", metadata.offset(), metadata.partition()); } else { exception.printStackTrace(); } } }); ​ producer.close();7.2 基于Kafka的数据传输与处理

Kafka可以作为数据传输和处理的框架,采用以下方案:

使用Producer将数据写入Kafka的Topic中,消费者可以从Topic中读取数据并进行处理。使用Kafka Connect来将数据源连接到Kafka集群。使用Kafka Streams API来进行流处理。

示例:

Java代码示例:使用Kafka Java客户端向名为“data”的主题发送一条数据消息;

CodeProperties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); ​ Producer producer = new KafkaProducer(props); ​ String topic = "data"; String key = "key1"; String value = "{ \"id\":1, \"name\":\"Alice\", \"age\":30 }"; ​ ProducerRecord record = new ProducerRecord(topic, key, value); ​ producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.printf("offset = %d, partition = %d%n", metadata.offset(), metadata.partition()); } else { exception.printStackTrace(); } } }); ​ producer.close();7.3 将Kafka集成到大数据处理系统中

将Kafka集成到大数据处理系统中可以采用以下方案:

使用Hadoop MapReduce来读取和写入Kafka的数据。使用Apache Spark Streaming来进行实时流数据处理。使用Apache Flink来进行实时流数据处理。

示例:

Java代码示例:使用Apache Spark Streaming从名为“data”的主题读取数据,并进行词频统计;CodeSparkConf conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local");JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));​Map kafkaParams = new HashMap();kafkaParams.put("bootstrap.servers", "localhost:9092");kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("group.id", "group1");kafkaParams.put("auto.offset.reset", "latest");kafkaParams.put("enable.auto.commit", false);​Collection topics = Arrays.asList("data");​final JavaInputDStream stream = KafkaUtils.createDirectStream( jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams) );​JavaDStream lines = stream.map(ConsumerRecord::value);​JavaDStream words = lines.flatMap(x -> Arrays.asList(x.split("\\s+")).iterator());​JavaPairDStream pairs = words.mapToPair(s -> new Tuple2(s, 1));JavaPairDStream wordCounts = pairs.reduceByKey(Integer::sum);​wordCounts.print();​jssc.start();jssc.awaitTermination();

7.4 将Kafka与SpringBoot集成

将Kafka与Spring Boot集成可以通过Spring Boot提供的spring-kafka模块来实现。以下是实现步骤:

7.4.1 在pom.xml文件中添加spring-kafka依赖:xmlCopy Code org.springframework.kafka spring-kafka ${spring-kafka.version} 7.4.2创建Kafka配置类,配置Kafka属性:Code@Configuration @EnableKafka public class KafkaConfig { ​ @Value("${kafka.bootstrap-servers}") private String bootstrapServers; ​ @Bean public Map producerConfigs() { Map props = new HashMap(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } ​ @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory(producerConfigs()); } ​ @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerFactory()); } }7.4.3 创建Kafka消息生产者类:@Component public class KafkaProducer { ​ @Autowired private KafkaTemplate kafkaTemplate; ​ @Value("${kafka.topic}") private String topic; ​ public void sendMessage(String message) { kafkaTemplate.send(topic, message); } }7.4.4 在application.properties文件中配置Kafka属性:Codekafka.bootstrap-servers=localhost:9092 kafka.topic=my_topic7.4.5 创建RESTful API控制器,通过KafkaProducer发送消息:Code@RestController @RequestMapping("/api") public class MessageController { ​ @Autowired private KafkaProducer kafkaProducer; ​ @PostMapping("/message") public ResponseEntity sendMessage(@RequestBody String message) { kafkaProducer.sendMessage(message); return new ResponseEntity(HttpStatus.OK); } }在Spring Boot应用程序中使用@EventListener注解监听Kafka消息: Code@Service public class KafkaConsumer { ​ @EventListener public void onMessage(ConsumerRecord record) { System.out.println("Received message: " + record.value()); } }7.4.6 示例中的代码仅仅是最基本的实现,可以在此基础上进一步拓展和优化。

将Kafka与Spring Boot集成可以通过Spring Boot提供的spring-kafka模块来实现。以下是实现步骤:

在pom.xml文件中添加spring-kafka依赖:Code org.springframework.kafka spring-kafka ${spring-kafka.version} 创建Kafka配置类,配置Kafka属性: Code@Configuration @EnableKafka public class KafkaConfig { ​ @Value("${kafka.bootstrap-servers}") private String bootstrapServers; ​ @Bean public Map producerConfigs() { Map props = new HashMap(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } ​ @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory(producerConfigs()); } ​ @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerFactory()); } }

3.创建Kafka消息生产者类:

Code@Component public class KafkaProducer { ​ @Autowired private KafkaTemplate kafkaTemplate; ​ @Value("${kafka.topic}") private String topic; ​ public void sendMessage(String message) { kafkaTemplate.send(topic, message); } }

4.在application.properties文件中配置Kafka属性:

Codekafka.bootstrap-servers=localhost:9092 kafka.topic=my_topic

5.创建RESTful API控制器,通过KafkaProducer发送消息:

Code@RestController @RequestMapping("/api") public class MessageController { ​ @Autowired private KafkaProducer kafkaProducer; ​ @PostMapping("/message") public ResponseEntity sendMessage(@RequestBody String message) { kafkaProducer.sendMessage(message); return new ResponseEntity(HttpStatus.OK); } }

6.在Spring Boot应用程序中使用@EventListener注解监听Kafka消息:

Code@Service public class KafkaConsumer { ​ @EventListener public void onMessage(ConsumerRecord record) { System.out.println("Received message: " + record.value()); } }

示例中的代码仅仅是最基本的实现,可以在此基础上进一步拓展和优化。

当将Kafka与Spring Boot集成时,需要特别注意以下几点:

Kafka和Zookeeper的版本需要兼容,否则可能会导致无法连接Zookeeper或者其他的问题。在Kafka生产者中使用KafkaTemplate发送消息时,需要注意设置Kafka主题(topic)的名称。在Kafka消费者中使用@KafkaListener注解监听主题时,需要注意设置主题名称。在Kafka消费者中使用@EventListener注解监听主题时,需要在配置类中添加KafkaListenerContainerFactory来实现对应的KafkaListenerEndpointRegistry。在Kafka消费者中使用@EventListener注解监听主题时,需要让KafkaConsumer实现ApplicationListener接口,并指定泛型为KafkaEvent,监听KafkaEvent事件。


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有